<html>
<body>

<p>
Implementation of physical operators that use hadoop as the execution engine
and data storage.

<h2> Design </h2>
<p>
Physical operators use the operator, plan, visitor, and optimizer framework
provided by the {@link org.apache.pig.impl.plan} package.
<p>
As with {@link org.apache.pig.impl.logicalLayer}, physical operators consist
of {@link org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators} and
{@link org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators}.  In many data
processing systems relational operators and expression operators are modeled
as different entities because they behave differently.  Pig blurs, though does
not entirely remove, this distinction because of its support for nested
operations.
<p>
Conceptually, relational operators work on an entire relation (in Pig's case,
a bag).  In terms of implementation, they operate on one record (tuple) at a
time.  This avoids needing to load the entire relation into memory before
operating on it.
<p>
Expression operators, on the other hand, operate on the assumption that they
are provided their entire input at invocation time and provide their entire
output when they are finished.
<p>
Pig's hadoop implementation implements a pull based model, where each operator
calls getNext() on the operator before it in the plan.  getNext() is
implemented for each of the different data types, so that operators can
request the data type they expect.  Relational operators will always expect a
tuple.  Expression operators can request any data type.
<p>
As with the logical plan, physical relational operators often have embedded
physical plans.  When a relational operator calls getNext() on its predecessor
and receives a tuple, it will attach that tuple to its embedded physical plan(s)
and then call getNext() on the root node(s) of those plan(s) in order to get the
output.  For example, the Pig Latin <code>filter A by $0 != 5</code> will
produce a POFilter object, with an embedded physical plan that consists of
POProject(0), POConst(5), both attached to PONotEqual.  Each time
POFilter.getNext() is called, it will call its predecessors getNext() method,
and then attach the input to POProject and POConst.  It will then call
PONotEqual.getNext().  PONotEqual will in turn call POProject.getNext() and
POConst.getNext(), and then evaluate and return the results.  If the result is
true, POFilter will return its input tuple.
If the answer is false, it will call it's predecessor's getNext() method and
try again.
<p>
Given Pig's nested data and execution models, there are places it is necessary
to move between relational and expression operators.  Consider the following
Pig Latin script:
<code>
A = load 'myfile';
B = group A by $0;
C = foreach B {
    C1 = filter $1 by $0 &gt; 0; 
    C2 = distinct C1;
    generate group, COUNT(C2), SUM(C1.$0);
}
</code>
In particular, the foreach section presents some interesting challenges.
<p>
First, foreach has three separate outputs, all of which require separate but
parallel executions.  To address this, each element of the foreach is described by a
separate embedded plan.  This can cause duplication of
operations, as in this plan.
In this case splitting the plans for COUNT and SUM cause a double execution of
the <code>C1 = filter</code> section of the script.  But it avoids needing to
place a split operator between filter -&gt; distinct and filter -&gt; SUM.
<p>
The second issue presented by the nested logic is that the
foreach operator is going to receive a tuple with the format ($0, bag), where
bag is a collection of all the tuples with a given value for $0.  It will then
attach that to the filter.  But filter does not expect a bag.  It expects
to get tuples.  On the other end, distinct will be outputing tuples.  But
COUNT() expects C2 to be a bag that can be processed by COUNT as a whole.
<p>
To address this issue, some operators have been modified to provide
"bookend" functionality.  That is, the ability to translate between relational
and expression operators.
The embedded plan for calculating the COUNT in the foreach will
look like:  POProject(1) -&gt; PODistinct -&gt; POProject(*) -&gt; COUNT().
The first POProject(1) will have a bag attached as its input by POForeach.
But POFilter will call getNext(Tuple).  In this case, POProject will know to
open the bag and provide the tuples one at a time, until the bag is empty, at
which point it will return STATUS_EOP.  The PODistinct will be expecting to
return tuples, but POProject(*) will call getNext(bag).  In this case all
relational operators will be able to accumulate all of the tuples by calling
getNext(tuple) on themselves until they see STATUS_EOP, packaging those tuples
into a bag, and then returning that bag.
<p>
And third, project is being subtly overloaded here.  In cases where the script
says <code>C = foreach B generate $1</code>, this type of projection means take the second
element from the tuple and project it.  But in cases like <code>C = foreach B
generate SUM($1.$0)</code> and $1 is a bag, this type of projection expects to
receive a bag ($1) and output a modified bag ($1 with only the first field,
$0, remaining in all the tuples in the bag).  To handle this issue, POProject
will, when it sees that its predecessor is a POProject and its successors is
an expression operator it will perform a projection on the bag (that is,
perform the specified project on each tuple in the bag) rather than on a
tuple.

</body>
</html>
